# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""I/O operators
.. currentmodule:: hysop.operator.hdf_io
* :class:`~HDF_Writer` : operator to write fields into an hdf file
* :class:`~HDF_Reader` : operator to read fields from an hdf file
* :class:`~HDF_IO` abstract interface for hdf io classes
"""
import subprocess
import sys
import os
import functools
from abc import ABCMeta, abstractmethod
try:
import h5py
except ImportError as e:
h5py = None
msg = "Warning: h5py not found, you may not be able to"
msg += " use hdf5 I/O functionnalities."
print(msg)
raise
from hysop import __H5PY_PARALLEL_COMPRESSION_ENABLED__, vprint
from hysop.core.graph.graph import discretized
from hysop.constants import (
DirectionLabels,
HYSOP_REAL,
Backend,
TranspositionState,
MemoryOrdering,
)
from hysop.tools.decorators import debug
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.numpywrappers import npw
from hysop.tools.io_utils import IO, IOParams, XMF
from hysop.core.graph.graph import op_apply
from hysop.backend.host.host_operator import HostOperatorBase
from hysop.fields.continuous_field import Field
from hysop.topology.cartesian_descriptor import CartesianTopologyDescriptors
from hysop.core.memory.memory_request import MemoryRequest
from hysop.topology.topology_descriptor import TopologyDescriptor
[docs]
class HDF_IO(HostOperatorBase, metaclass=ABCMeta):
"""
Abstract interface to read/write from/to hdf files, for
hysop fields.
"""
[docs]
@classmethod
def supported_backends(cls):
"""
Return the backends that this operator's topologies can support.
"""
return Backend.all
def __new__(
cls, var_names=None, name_prefix="", name_postfix="", force_backend=None, **kwds
):
return super().__new__(cls, **kwds)
def __init__(
self,
var_names=None,
name_prefix="",
name_postfix="",
force_backend=None,
**kwds,
):
"""Read/write some fields data from/into hdf/xmdf files.
Parallel io.
Parameters
----------
var_names : a dictionnary, optional
keys = :class:`~hysop.fields.continuous_field.Field`,
values = string, field name. See notes below.
name_prefix: str, optional
Optional name prefix for variables.
name_postfix: str, optional
Optional name postfix for variables.
force_backend: hysop.constants.Backend
Force the source backend for fields.
kwds: dict
Base class arguments.
Notes
-----
Dataset in hdf files are identified with names. In hysop, when
writing a file, default dataset name is
'continuous_field.name + topo.id + component direction', but
this might be changed thanks to var_names argument.
For example, if input_fields=[velo, vorti], and if hdf file contains
'vel_1_X, vel_1_Y, vel_1_Z, dat_2_X, dat_2_Y, dat_2_Z' keys, then
use :
var_names = {velo: 'vel', vorti:'dat'} if you want to read vel/dat
into velo/vorti.
"""
check_instance(var_names, dict, keys=Field, values=str, allow_none=True)
super().__init__(**kwds)
self.name_prefix = name_prefix
self.name_postfix = name_postfix
if h5py is None:
msg = "You try to use HDF5 reader but h5py module"
msg += "has not been found on your system."
raise RuntimeError(msg)
# If no filename is given, set it to
# the concatenation of input_fields'names.
if self.io_params is None:
# if name is not set, name = concatenation of fields names,
# like v1_v2.
# WARNING FP: we must sort names (alph. order), else
# it seems the order may change from one mpi process
# to another.
name = ""
names = []
for var in self.input_fields:
names.append(var.name)
names.sort()
for nn in names:
name += nn + "_"
name = name[:-1]
self.io_params = IOParams(name, fileformat=IO.HDF5)
else:
assert self.io_params.fileformat is IO.HDF5
# Dictionnary of names to search in hdf file. May be None.
# It will be checked during setup.
self.var_names = {} if var_names is None else var_names
# Local topology, that MUST be common to all input_fields.
self.topology = None
self._local_compute_slices = None
self._global_grid_resolution = None
self._local_grid_resolution = None
self._all_local_grid_resolution = None
self._global_slices = None
# Dictionnary of discrete fields. Key = name in hdf file,
# Value = discrete field
self.dataset = {}
# Get hdf file name. Depends on read/write process. Must be
# defined in HDF_READER or HDF_WRITER init.
self._get_filename = lambda i=None: None
# File Object that holds hdf file
self._hdf_file = None
# field backend
self._force_backend = first_not_None(force_backend, Backend.HOST)
td_kwds = {}
if force_backend is Backend.OPENCL:
assert "cl_env" in kwds
td_kwds["cl_env"] = kwds.pop("cl_env")
self._td_kwds = td_kwds
[docs]
@debug
def create_topology_descriptors(self):
"""
Called in get_field_requirements, just after handle_method
Topology requirements (or descriptors) are:
1) min and max ghosts for each input and output variables
2) allowed splitting directions for cartesian topologies
"""
# Here we recreate TopologyDescriptors to allow a forced backend
# like a OpenCL mapped memory backend or when we do not want
# to allocate memory for a topology that is just used for I/O.
td_kwds = self._td_kwds
for field, topo_descriptor in self.input_fields.items():
topo_descriptor = TopologyDescriptor.build_descriptor(
backend=self._force_backend,
operator=self,
field=field,
handle=topo_descriptor,
**td_kwds,
)
self.input_fields[field] = topo_descriptor
for field, topo_descriptor in self.output_fields.items():
topo_descriptor = TopologyDescriptor.build_descriptor(
backend=self._force_backend,
operator=self,
field=field,
handle=topo_descriptor,
**td_kwds,
)
self.output_fields[field] = topo_descriptor
[docs]
@debug
def get_field_requirements(self):
# set good transposition state
requirements = super().get_field_requirements()
for is_input, ireq in requirements.iter_requirements():
if ireq is None:
continue
(field, td, req) = ireq
req.memory_order = MemoryOrdering.C_CONTIGUOUS
req.axes = (TranspositionState[field.dim].default_axes(),)
return requirements
[docs]
def get_node_requirements(self):
node_reqs = super().get_node_requirements()
node_reqs.enforce_unique_transposition_state = True
node_reqs.enforce_unique_topology_shape = True
node_reqs.enforce_unique_memory_order = True
node_reqs.enforce_unique_ghosts = False
return node_reqs
[docs]
def discretize(self):
super().discretize()
topo = self.input_fields[
next(iter(sorted(self.input_fields.keys(), key=lambda f: f.name)))
]
use_local_hdf5 = topo.cart_size == 1
use_local_hdf5 |= (
(topo.proc_shape[0] == topo.cart_size)
and (topo.cart_size <= 10)
and (not self.io_params.hdf5_disable_slicing)
)
# XDMF JOIN do not support more than 10 arguments
self.topology = topo
self.use_local_hdf5 = use_local_hdf5
self.use_parallel_hdf5 = not use_local_hdf5
refmesh = topo.mesh
# Global resolution for hdf5 output
self._global_grid_resolution = refmesh.grid_resolution
# Local resolution for hdf5 output
self._local_grid_resolution = refmesh.compute_resolution
assert self.io_params.io_leader < topo.cart_comm.size
self._all_local_grid_resolution = topo.cart_comm.gather(
self._local_grid_resolution, root=self.io_params.io_leader
)
local_compute_slices = {}
global_compute_slices = {}
for field, itopo in self.input_fields.items():
mesh = itopo.mesh
assert topo.domain._domain is itopo.domain._domain, "domain mismatch"
assert npw.array_equal(
refmesh.grid_resolution, mesh.grid_resolution
), "global grid resolution mismatch"
assert mesh.on_proc == refmesh.on_proc
if mesh.on_proc:
local_compute_slices[field] = mesh.local_compute_slices
global_compute_slices[field] = mesh.global_compute_slices
else:
local_compute_slices[field] = tuple(
slice(0, 0) for _ in range(self.domain.dim)
)
global_compute_slices[field] = tuple(
slice(0, 0) for _ in range(self.domain.dim)
)
self._local_compute_slices = local_compute_slices
self._global_compute_slices = global_compute_slices
self.refmesh = refmesh
name_prefix, name_postfix = self.name_prefix, self.name_postfix
fields_names = {}
# Get field names and initialize dataset dict.
for df in self.discrete_fields:
df_name = df.name
if df.continuous_fields()[0] in self.var_names.keys():
df_name = self.var_names[df.continuous_fields()[0]]
for d in range(df.nb_components):
if df.nb_components == 1:
name = name_prefix + df_name + name_postfix
else:
name = name_prefix + df_name + f"_{d}" + name_postfix
self.dataset[name] = df.data[d]
fields_names[df.field] = name
self.fields_names = fields_names
for f, name in self.fields_names.items():
assert f in self._local_compute_slices
assert f in self._global_compute_slices
self._local_compute_slices[name] = self._local_compute_slices[f]
self._global_compute_slices[name] = self._global_compute_slices[f]
[docs]
def open_hdf(self, count, mode, compression="gzip"):
filename = self._get_filename(count)
if self.topology.cart_size == 1:
self._hdf_file = h5py.File(filename, mode)
elif self.use_parallel_hdf5:
self._hdf_file = h5py.File(
filename, mode, driver="mpio", comm=self.topology.comm
)
# disable compression if hdf5 library version is less than 1.10.2 (checked in CMakeList.txt).
if not __H5PY_PARALLEL_COMPRESSION_ENABLED__:
compression = None
else:
filename = filename.format(rk=self.topology.cart_rank)
self._hdf_file = h5py.File(filename, mode)
if self.io_params.hdf5_disable_compression:
compression = None
return (filename, compression)
[docs]
@classmethod
def supports_multiple_topologies(cls):
return True
[docs]
@classmethod
def supports_mpi(cls):
return True
[docs]
class HDF_Writer(HDF_IO):
"""
Print field(s) values on a given topo, in HDF5 format.
"""
__xmf_header = """<?xml version=\"1.0\" ?>
<!DOCTYPE Xdmf SYSTEM \"Xdmf.dtd\">
<Xdmf Version=\"2.0\">
<Domain>
<Grid Name=\"CellTime\" GridType=\"Collection\" CollectionType=\"Temporal\">
"""
__xmf_footer = """ </Grid>
</Domain>
</Xdmf>
"""
def __new__(cls, variables, name=None, pretty_name=None, **kwds):
return super().__new__(
cls,
input_fields=None,
output_fields=None,
name=name,
pretty_name=pretty_name,
**kwds,
)
def __init__(self, variables, name=None, pretty_name=None, **kwds):
"""
Write some fields data into hdf/xmdf files.
Parallel writings.
Parameters
----------
kwds : base class arguments
"""
check_instance(variables, dict, keys=Field, values=CartesianTopologyDescriptors)
vnames = [f"{field.name}" for field in variables.keys()]
vpnames = [field.pretty_name for field in variables.keys()]
name = first_not_None(name, "write_{}".format("_".join(vnames)))
pname = first_not_None(pretty_name, "write_{}".format("_".join(vpnames)))
super().__init__(
input_fields=variables,
output_fields=None,
name=name,
pretty_name=pname,
**kwds,
)
# count the number of calls
self._count = 0
self.step = self._step_HDF5
self._xdmf_data_files = []
# filename = prefix_N, N = counter value
self._get_filename = self._input_fname
# In xdmf file, it is not possible to have two times
# the same time value. So we save the last value of
# time where xdmf has been written, to raise an exception
# if that happens.
self._last_written_time = None
self._xmf_file = None
if self.io_params.append:
self.openXMFFile()
self._data_getters = {}
[docs]
def get_work_properties(self, **kwds):
requests = super().get_work_properties(**kwds)
max_bytes = 0
for name, data in self.dataset.items():
if data.backend.kind == Backend.HOST:
continue
if data.backend.kind == Backend.OPENCL:
from hysop.backend.device.opencl import cl
if data.backend.device.type == cl.device_type.CPU:
continue
# we need a host buffer to get the data
max_bytes = max(data.nbytes, max_bytes)
host_backend = data.backend.host_array_backend
if max_bytes > 0:
request = MemoryRequest(
backend=host_backend, size=max_bytes, dtype=npw.uint8
)
requests.push_mem_request(request_identifier="buffer", mem_request=request)
return requests
[docs]
def setup(self, work, **kwds):
super().setup(work=work, **kwds)
self._setup_grid_template()
for name, data in self.dataset.items():
data = data[self._local_compute_slices[name]]
if data.backend.kind is Backend.HOST:
def get_data(data=data.handle):
return data
elif data.backend.kind is Backend.OPENCL:
from hysop.backend.device.opencl.opencl_copy_kernel_launchers import (
OpenClCopyBufferRectLauncher,
)
from hysop.backend.device.opencl import cl
if data.backend.device.type == cl.device_type.CPU:
def get_data(
data=data.handle, queue=data.backend.cl_env.default_queue
):
buf = data.map_to_host(
queue=queue, is_blocking=True, flags=cl.map_flags.READ
)
return buf
# unmap is called when buf is destroyed
else:
(buf,) = work.get_buffer(self, "buffer", handle=True)
assert buf.dtype == npw.uint8
assert buf.size >= data.nbytes
buf = buf[: data.nbytes].view(dtype=data.dtype).reshape(data.shape)
cpy = OpenClCopyBufferRectLauncher.from_slices(
varname=name, src=data, dst=buf
)
cpy = functools.partial(
cpy, queue=data.backend.cl_env.default_queue
)
def get_data(cpy=cpy, buf=buf):
cpy().wait()
return buf
else:
msg = "Data type not understood or unknown array backend."
raise NotImplementedError(msg)
self._data_getters[name] = get_data
[docs]
def finalize(self):
if self._xmf_file:
filename = self._xmf_file.name
self._xmf_file.close()
if self.io_params.dump_is_temporary:
vprint(f">Deleting XMF file {filename}...")
os.remove(filename)
def _input_fname(self, i):
"""Set output file name for current iteration"""
msg = "count < 0, simu must be initialized."
assert i >= 0, msg
if (self.topology.cart_size == 1) or self.use_parallel_hdf5:
return self.io_params.filename + f"_{i:06d}" + ".h5"
else:
assert self.use_local_hdf5
return self.io_params.filename + f"_{i:06d}" + "_rk{rk:04d}.h5"
@op_apply
def apply(self, simulation=None, **kwds):
if simulation is None:
raise ValueError("Missing simulation value for monitoring.")
if self.io_params.should_dump(simulation=simulation):
if self._xmf_file is None:
self.createXMFFile()
self.step(simulation)
self._count += 1
def _setup_grid_template(self):
topo = self.topology
dim = topo.domain.dim
dx = list(topo.mesh.space_step)
mesh = self.refmesh
res = list(mesh.grid_resolution)
orig = list(topo.domain.origin)
resolution = [
1,
] * 3
origin = [
0.0,
] * 3
step = [
0.0,
] * 3
idim = 3 - dim
resolution[idim:] = res
origin[idim:] = orig
step[idim:] = dx
write_resolution = tuple(resolution)
write_origin = tuple(origin)
write_step = tuple(step)
ds_names = self.dataset.keys()
joinrkfiles = None
if self.use_local_hdf5 and (self.topology.cart_size > 1):
joinrkfiles = tuple(range(self.topology.cart_size))
grid_attributes = XMF.prepare_grid_attributes(
ds_names, resolution, origin, step, joinrkfiles=joinrkfiles
)
self.grid_attributes_template = grid_attributes
[docs]
def createXMFFile(self):
"""Create and fill the header of the xdmf file."""
if self.mpi_params.rank == self.io_params.io_leader:
f = open(self.io_params.filename + ".xmf", "wb")
f.write(HDF_Writer.__xmf_header.encode())
self._last_xmf_pos = f.tell()
self._xmf_file = f
f.flush()
[docs]
def openXMFFile(self):
"""Open an existing xdmf file."""
if self.mpi_params.rank == self.io_params.io_leader:
f = open(self.io_params.filename + ".xmf", "rb+")
f.seek(-len(HDF_Writer.__xmf_footer), 2) # Read from file end
self._last_xmf_pos = f.tell()
self._xmf_file = f
[docs]
def updateXMFFile(self):
"""Update xdmf file."""
if self.mpi_params.rank == self.io_params.io_leader:
f = self._xmf_file
lastp = self._last_xmf_pos
assert f is not None
assert lastp is not None
for i, t in self._xdmf_data_files:
if (self.topology.cart_size == 1) or self.use_parallel_hdf5:
filenames = {"filename": self._get_filename(i).split("/")[-1]}
else:
filenames = {
"filename"
+ str(r): self._get_filename(i).format(rk=r).split("/")[-1]
for r in range(self.topology.cart_size)
}
filenames.update(
(
"resolution" + str(r),
XMF._list_format(self._all_local_grid_resolution[r]),
)
for r in range(self.topology.cart_size)
)
grid_attrs = self.grid_attributes_template.format(
niteration=i, time=t, **filenames
)
f.seek(lastp)
f.write(grid_attrs.encode())
self._last_xmf_pos = f.tell()
f.write(HDF_Writer.__xmf_footer.encode())
f.flush()
self._xdmf_data_files = []
def _step_HDF5(self, simu):
"""Write an h5 file with data on each mpi process.
If parallel interface of HDF5 is not enabled, each rank is
writing its own h5 file. All files are concatenated in the xmf
part with a 'JOIN' function. If parallel interface enabled,
only one h5 file is written by all ranks.
"""
# Remarks:
# - force np.float64, ParaView seems unable to read float32
# - writing compressed hdf5 files (gzip compression seems the best)
# - gzip compression does not work in parallel.
# Get 'current' filename. It depends on the number
# of the current output (count) and on the current process
# rank.
self._count = simu.current_iteration
(filename, compression) = self.open_hdf(self._count, mode="w")
vprint(
">Dumping {} {} HDF5 data to {}...".format(
"compressed" if compression else "uncompressed",
"parallel" if self.use_parallel_hdf5 else "",
filename,
)
)
# Get the names of output input_fields and create the corresponding
# datasets
if self.use_local_hdf5:
for name in sorted(self.dataset):
ds = self._hdf_file.create_dataset(
name=name,
shape=self._local_grid_resolution,
dtype=self.dataset[name].dtype,
compression=compression,
track_times=False,
) # required if we want to compare checksums in tests
ds[...] = self._data_getters[name]()
elif self.use_parallel_hdf5:
for name in sorted(self.dataset):
ds = self._hdf_file.create_dataset(
name=name,
shape=self._global_grid_resolution,
dtype=self.dataset[name].dtype,
compression=compression,
track_times=False,
) # required if we want to compare checksums in tests
if compression is None:
# no need for collective here because we do not use any filter
ds[self._global_compute_slices[name]] = self._data_getters[name]()
else:
with ds.collective:
ds[self._global_compute_slices[name]] = self._data_getters[
name
]()
else:
msg = "Unknown HDF5 mode."
raise RuntimeError(msg)
# Collect datas required to write the xdmf file
# --> add tuples (counter, time).
if simu.t() == self._last_written_time:
msg = "You cannot write two hdf files for the same "
msg += "(time, var) set. "
msg += "If you want to save a field two times for "
msg += "a single time value, please use two hdf_writer operators."
raise RuntimeError(msg)
self._xdmf_data_files.append((self._count, simu.t()))
self._last_written_time = simu.t()
self._hdf_file.close()
self.updateXMFFile()
if self.io_params.postprocess_dump:
postprocess_cmd = self.io_params.postprocess_dump
op_name = self.name
actual_filepath = self.io_params.filepath
disk_filepath = self.io_params.disk_filepath
xmf_file = self._xmf_file.name
hdf_file = filename
hdf_is_tmp = self.io_params.dump_is_temporary
iteration = self._count
time = self._last_written_time
vprint(f">Executing postprocessing script: {postprocess_cmd}")
# execute command OP_NAME ACTUAL_FILEPATH DISK_FILEPATH XMF_FILE HDF5_FILE IS_TMP
command = [
str(postprocess_cmd),
str(op_name),
str(actual_filepath),
str(disk_filepath),
str(xmf_file),
str(hdf_file),
"1" if hdf_is_tmp else "0",
str(iteration),
str(time),
]
try:
subprocess.check_call(command)
except OSError as e:
msg = f"\nFATAL ERROR: Could not find or execute postprocessing script '{command[0]}'."
print(msg)
print()
raise
except subprocess.CalledProcessError as e:
if e.returncode == 10:
msg = "Postprocessing script has requested to stop the simulation (return code 10), exiting."
vprint(msg)
sys.exit(0)
else:
msg = "\nFATAL ERROR: Failed to call I/O postprocessing command.\n{}\n"
msg = msg.format(" ".join(command))
print(msg)
print()
raise
if self.io_params.dump_is_temporary:
vprint(f">Deleting HDF5 data {filename}...")
os.remove(filename)
del self._xdmf_data_files[:]
[docs]
class HDF_Reader(HDF_IO):
"""
Parallel reading of hdf/xdmf files to fill some fields in.
"""
def __new__(cls, variables, restart=None, name=None, **kwds):
return super().__new__(
cls, input_fields=None, output_fields=None, name=name, **kwds
)
def __init__(self, variables, restart=None, name=None, **kwds):
"""Read some fields data from hdf/xmdf files.
Parallel readings.
Parameters
----------
restart : int, optional
number of a specific iteration to be read, default=None,
i.e. read first iteration.
kwds : base class arguments
Notes: restart corresponds to the number which appears in
the hdf file name, corresponding to the number of the
iteration where writing occured.
See examples in tests_hdf_io.py
"""
vnames = [f"{var.name[:3]}[{topo.id}]" for var, topo in variables.items()]
name = name or "read_{}".format(",".join(vnames))
super().__init__(input_fields=None, output_fields=variables, name=name, **kwds)
self.restart = restart
if self.restart is not None:
# filename = prefix_N, N = counter value
self._get_filename = (
lambda i=self.restart: self.io_params.filename + f"_{i:05d}" + ".h5"
)
else:
self._get_filename = lambda i=None: self.io_params.filename
@op_apply
def apply(self, simulation=None, **kwds):
# Read HDF file
self.open_hdf(count=self.restart, mode="r")
# Get the list of dataset names available in the hdf file
dsnames = self.dataset_names()
# And check if required dataset (from self.dataset)
# are all in this list.
msg = "You try to read a dataset not present in hdf file : "
for name in self.dataset:
assert name in dsnames, msg + name
self.dataset[name][self._local_compute_slices] = self._hdf_file[name][
self._global_compute_slices
]
self._hdf_file.close()
self._hdf_file = None
[docs]
@discretized
def dataset_names(self):
"""Return the list of available names for datasets in
the required file.
"""
if self._hdf_file is None:
self.open_hdf(count=self.restart, mode="r")
return self._hdf_file.keys()
[docs]
def finalize(self):
if self._hdf_file is not None:
self._hdf_file.close()